{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# What are we going to do?\n",
    "\n",
    "We will build a data processing pipeline that takes our data sources performs aggregations by precise boundaries. Then run a weighted algorithm to calculate a \"cultural\" score for each city normalized by its total population. So the pipeline is something like this:\n",
    "\n",
    "![](images/sketch.png)\n",
    "\n",
    "## What do we mean by culture/cultural score?\n",
    "\n",
    "This is a completely subjective and our algorithm is therefore only meant as an excuse to have fun learning PySpark and Spatial Analysis. We collect data from OpenStreetMap that seems culturally relevant. These include the following tags:\n",
    "\n",
    "    \"tourism\"=\"artwork\"\n",
    "    \"tourism\"=\"gallery\"\n",
    "    \"amenity\"=\"theatre\"\n",
    "    \"amenity\"=\"arts_centre\"\n",
    "    \"tourism\"=\"museum\"\n",
    "\n",
    "We will also assign different weights to each tag. From 1 to 5. The tags above are listed from highest (artwork) to lowest (museum). This should make our analysis interesting. We will calculate a score by performing a weighted count for each of the sites found per city. Finally we will normalize our count by dividing it by the total population of the city."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/imports.py'\n",
    "import notebook\n",
    "import os.path, json, io, pandas\n",
    "import matplotlib.pyplot as plt\n",
    "import matplotlib\n",
    "matplotlib.rcParams['figure.figsize'] = (16, 20)\n",
    "\n",
    "from retrying import retry # for exponential back down when calling TurboOverdrive API\n",
    "\n",
    "import pyspark.sql.functions as func # resuse as func.coalace for example\n",
    "from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType\n",
    "\n",
    "import pandas as pandas\n",
    "from geopandas import GeoDataFrame # Loading boundaries Data\n",
    "from shapely.geometry import Point, Polygon, shape # creating geospatial data\n",
    "from shapely import wkb, wkt # creating and parsing geospatial data\n",
    "import overpy # OpenStreetMap API\n",
    "\n",
    "from ast import literal_eval as make_tuple # used to decode data from java\n",
    "\n",
    "# make sure nbextensions are installed\n",
    "notebook.nbextensions.check_nbextension('usability/codefolding', user=True)\n",
    "\n",
    "try:\n",
    "    sc\n",
    "except NameError:\n",
    "    import pyspark\n",
    "    sc = pyspark.SparkContext('local[*]')\n",
    "    sqlContext = pyspark.sql.SQLContext(sc)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/osm2geojson.py'\n",
    "### Helper functions\n",
    "\n",
    "# given shapely bounds return bbox compatiable with overpass turbo openstreetmap API\n",
    "def bbox(bounds):\n",
    "    return (bounds[1],bounds[0],bounds[3],bounds[2])\n",
    "\n",
    "# given an openstreetmap node retrun a GeoJSON feature\n",
    "def nodeToFeature(node):\n",
    "    properties = node.tags\n",
    "    properties['wkt'] = Point(node.lon, node.lat).wkt\n",
    "    return {\n",
    "        \"type\": \"Feature\",\n",
    "        \"properties\": properties,\n",
    "        \"geometry\": {\n",
    "            \"type\": \"Point\",\n",
    "            \"coordinates\": [\n",
    "                float(node.lon),\n",
    "                float(node.lat)\n",
    "            ]\n",
    "        }\n",
    "    }\n",
    "\n",
    "# given an array of nodes return an array of GeoJSON features\n",
    "def nodesToFeatures(nodes):\n",
    "    \"\"\"\n",
    "    :param nodes\n",
    "    :type nodes from overpy.Result (result.nodes)\n",
    "    :return:\n",
    "    \"\"\"\n",
    "    features = []\n",
    "    for node in nodes:\n",
    "        features.append(nodeToFeature(node))\n",
    "    return features\n",
    "\n",
    "def waysToFeatures(ways):\n",
    "    print ways\n",
    "    features = []\n",
    "    return features\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/query_overpass_api.py'\n",
    "# method to handle OverpassTooManyRequests exception from OpenStreetMap/overpass turbo API\n",
    "def retry_if_overpass_too_many_requests(exception):\n",
    "    return isinstance(exception, overpy.exception.OverpassTooManyRequests)\n",
    "\n",
    "# decorator to retry with exponential back off\n",
    "@retry(wait_exponential_multiplier=2000,\n",
    "       wait_exponential_max=60000,\n",
    "       retry_on_exception=retry_if_overpass_too_many_requests)\n",
    "def call_overpass_api(q):\n",
    "    return OVERPASS_API.query(q)\n",
    "\n",
    "def run_overpass_api(bounding_geo_df):\n",
    "    local_pois = []\n",
    "    for index, row in bounding_geo_df.iterrows():\n",
    "        # For documentation see:\n",
    "        # http://wiki.openstreetmap.org/wiki/Tag:{key}={value}\n",
    "        # e.g: http://wiki.openstreetmap.org/wiki/Tag:amenity=theatre\n",
    "        payload = \"\"\"\n",
    "            [out:json][timeout:60];\n",
    "            (\n",
    "              node[\"tourism\"=\"gallery\"]%(box)s;\n",
    "              node[\"tourism\"=\"artwork\"]%(box)s;\n",
    "              node[\"tourism\"=\"museum\"]%(box)s;\n",
    "              node[\"amenity\"=\"arts_centre\"]%(box)s;\n",
    "              node[\"amenity\"=\"theatre\"]%(box)s;\n",
    "            );\n",
    "            out body;\"\"\" % {'box': str(bbox(row.geometry.bounds))}\n",
    "        result = call_overpass_api(payload)\n",
    "        local_pois.extend(nodesToFeatures(result.nodes))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "OVERPASS_API         = overpy.Overpass()\n",
    "BASE_DIR             = os.path.join(os.path.abspath('.'), 'work-flow')\n",
    "URBAN_BOUNDARIES_FILE = '06_Europe_Cities_Boundaries_with_Labels_Population.geo.json'\n",
    "\n",
    "# Paths to base datasets that we are using:\n",
    "URBAN_BOUNDARIES_PATH = os.path.join(BASE_DIR,URBAN_BOUNDARIES_FILE)\n",
    "POIS_PATH            = os.path.join(BASE_DIR, \"pois.json\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.9"
  },
  "toc": {
   "toc_cell": false,
   "toc_number_sections": true,
   "toc_threshold": 6,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
